package com.wuwangfu.chain;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author jcshen
 * @Date 2023-02-27
 * @PackageName:com.wuwangfu.chain
 * @ClassName: StartNewChain
 * @Description:
 * @Version 1.0.0
 *
 * https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
 *
 */
public class StartNewChain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //默认开启OperatorChain
        //env.disableOperatorChaining();
        DataStreamSource<String> line = env.socketTextStream("locahost", 8888);
        SingleOutputStreamOperator<String> words = line.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        SingleOutputStreamOperator<String> filtered = words.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.startsWith("error");
            }
        })
        //.startNewChain()//开启一个新链
                ;

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filtered.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        }).startNewChain();//开启一个新链

        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);
        keyed.sum(1).print();

        env.execute();
    }
}
